/*
 * jETeL/CloverETL - Java based ETL application framework.
 * Copyright (c) Javlin, a.s. (info@cloveretl.com)
 *  
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License as published by the Free Software Foundation; either
 * version 2.1 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public
 * License along with this library; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
 */
package org.jetel.data.formatter;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SeekableByteChannel;
import java.nio.channels.WritableByteChannel;

import org.jetel.data.DataRecord;
import org.jetel.data.Defaults;
import org.jetel.data.Token;
import org.jetel.exception.ComponentNotReadyException;
import org.jetel.metadata.DataFieldMetadata;
import org.jetel.metadata.DataFieldType;
import org.jetel.metadata.DataRecordMetadata;
import org.jetel.metadata.DataRecordParsingType;
import org.jetel.util.bytes.ByteBufferUtils;
import org.jetel.util.bytes.CloverBuffer;
import org.jetel.util.stream.SeekableOutputStream;

public class CloverDebugFormatter extends CloverDataFormatter {

	public static final int RECORD_NUMBER_SIZE = 8;
	public static final String RECORD_NUMBER_FIELD_NAME = "__recordNumber";
	
	private ByteBuffer longBuffer = ByteBuffer.allocate(RECORD_NUMBER_SIZE);
	private CloverBuffer tmpBuffer;
	private final Object dataTarget;
	
	public CloverDebugFormatter(WritableByteChannel channel) {
		if (channel instanceof SeekableByteChannel) {
			this.dataTarget = new SeekableOutputStream((SeekableByteChannel) channel);
			this.syncFlush = true;
		} else {
			this.dataTarget = channel;
		}
		setCompressLevel(-1);
	}

	@Override
	public void init(DataRecordMetadata metadata) throws ComponentNotReadyException {
		DataRecordMetadata duplicate = metadata.duplicate();
		if (metadata.getParsingType() == DataRecordParsingType.FIXEDLEN) {
			duplicate.addField(0, new DataFieldMetadata(RECORD_NUMBER_FIELD_NAME, DataFieldType.LONG, 10));
		} else {
			String delimiter = (metadata.getFieldDelimiter() != null) ? null : "|"; // only set if no default delimiter
			duplicate.addField(0, new DataFieldMetadata(RECORD_NUMBER_FIELD_NAME, DataFieldType.LONG, delimiter));
		}
		
		super.init(duplicate);
		
		try {
			setDataTarget(dataTarget);
		} catch (IOException e) {
			throw new ComponentNotReadyException(e);
		}
	}

	public int writeLong(long value) throws IOException {
		longBuffer.putLong(value);
		longBuffer.flip();
		return 8; //do we have this constant somewhere?
	}

	@Override
	public int write(DataRecord record) throws IOException {
		if (tmpBuffer == null) {
			tmpBuffer = CloverBuffer.allocate(Defaults.Record.RECORD_INITIAL_SIZE, Defaults.Record.RECORD_LIMIT_SIZE);
		}
		tmpBuffer.clear();
		record.serialize(tmpBuffer);
		tmpBuffer.flip();
		return writeDirect(tmpBuffer);
	}

	@Override
	public int writeDirect(CloverBuffer recordBuffer) throws IOException {
		if (isJobflow) {
			// CLO-2657: file generated by a jobflow would not be readable by a graph
			Token.deserializeTokenId(recordBuffer); // do not serialize token ID
		}
		
		output.markRecordStart();
		
		int recordSize = recordBuffer.remaining() + RECORD_NUMBER_SIZE;
		final int lenbytes=ByteBufferUtils.encodeLength(output, recordSize);
		
		serializeRecordNumber();
		output.write(recordBuffer);
		
        return recordSize + lenbytes;
	}
	
	private void serializeRecordNumber() throws IOException {
		output.write(longBuffer);
		longBuffer.flip();
	}
}
